package cn.spream.jstudy.dquartz;

import cn.spream.jstudy.dquartz.config.ZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

import java.util.concurrent.TimeUnit;

/**
 * 分布式Job基础类
 * Created with IntelliJ IDEA.
 * User: sjx
 * Date: 15/3/10
 * Time: 下午9:29
 * To change this template use File | Settings | File Templates.
 */
public abstract class DistributedJob implements LeaderSelectorListener {

    private ZkConfig zkConfig;
    private CuratorFramework curatorFramework;
    private LeaderSelector leaderSelector;

    public DistributedJob(ZkConfig zkConfig) {
        this.zkConfig = zkConfig;
    }

    /**
     * 开始leader选举
     */
    protected void start() {
        curatorFramework = CuratorFrameworkFactory.newClient(zkConfig.getAddress(), new ExponentialBackoffRetry(1000, 3));
        curatorFramework.start();
        leaderSelector = new LeaderSelector(curatorFramework, getLeaderPath(zkConfig.getRootPath()), this);
        leaderSelector.autoRequeue();
        leaderSelector.start();
    }

    /**
     * 获取leader选举目录
     *
     * @param rootPath
     * @return
     */
    protected abstract String getLeaderPath(String rootPath);

    /**
     * 销毁释放zookeeper资源
     */
    protected void destroy() {
        CloseableUtils.closeQuietly(leaderSelector);
        CloseableUtils.closeQuietly(curatorFramework);
    }

    /**
     * 调度执行入口
     */
    public void execute() {
        if (leaderSelector.hasLeadership()) {
            doBusiness();
        } else {
            notBusiness();
        }
    }

    /**
     * 执行业务逻辑
     */
    protected abstract void doBusiness();

    /**
     * 不执行业务逻辑
     */
    protected abstract void notBusiness();

    /**
     * 获取到leader权限后的回调方法，退出则放弃leader权限
     *
     * @param curatorFramework
     * @throws Exception
     */
    @Override
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        while (leaderSelector.hasLeadership()) {
            TimeUnit.SECONDS.sleep(10);
        }
    }

    /**
     * 状态改变回调方法
     *
     * @param curatorFramework
     * @param connectionState
     */
    @Override
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if (connectionState == ConnectionState.SUSPENDED || connectionState == ConnectionState.LOST) {
            throw new CancelLeadershipException();
        }
    }

    public void setZkConfig(ZkConfig zkConfig) {
        this.zkConfig = zkConfig;
    }
}
